Tip
阅读指南
想象你在构建一个AI代码生成助手。测试阶段表现很好,但正式使用后发现:AI生成的代码虽然能运行,但有时会违反团队的代码规范,甚至引入潜在的安全漏洞。
你立刻意识到:AI虽然高效,但生成的代码仍需要人类审核。完全自动化可能带来不可控的风险。
解决方案是什么?加入人工介入(Human-in-the-Loop)——让AI生成代码后暂停,等待工程师审核通过后再继续执行。
这就是本节要解决的问题:如何在自动化工作流中优雅地加入人工控制点,实现人机协作。
LangGraph通过编译时设置暂停点实现人工介入:
from langgraph.checkpoint.memory import MemorySaver
# 编译时指定暂停点
checkpointer = MemorySaver() # 必需:保存状态快照
app = workflow.compile(
interrupt_before=["review"], # 在review节点前暂停
checkpointer=checkpointer
)
两个关键要素:
interrupt_before: 指定在哪个节点前暂停工作流checkpointer: 保存工作流状态,支持暂停后恢复执行模式
人工介入采用两次调用模式:
config = {"configurable": {"thread_id": "session_001"}}
# 第一次调用:执行到暂停点
result = app.invoke({"requirement": "..."}, config)
# 工程师审核并更新决策
app.update_state(config, {"human_decision": "approve"})
# 第二次调用:从暂停点继续
result = app.invoke(None, config) # None表示从检查点恢复
关键点:
thread_id: 唯一标识一次工作流会话update_state(): 更新状态中的人工决策None,自动从检查点恢复需求分析
项目目标:构建一个AI代码生成系统,工程师通过键盘实时输入审核决策
业务流程:
完整源码:
samples/chapter9/code_review/code_review_workflow.py
from typing import TypedDict, Optional
class CodeState(TypedDict):
requirement: str # 需求描述
code: str # AI生成的代码
human_decision: Optional[str] # 工程师决策: approve/reject
final_code: str # 最终提交代码
关键字段: human_decision 初始为 None,工程师审核后更新为 approve/reject
完整源码:
samples/chapter9/code_review/code_review_workflow.py以下仅展示关键逻辑,完整实现请参考源码。
节点2:工程师审核 (暂停点)
def human_review(state: CodeState):
"""工程师审核节点(interrupt_before暂停点)"""
decision = state.get("human_decision")
if decision is None:
# 第一次到达:展示代码并暂停
print("⏸ 工作流已暂停,等待工程师审核")
print(f"代码:\n{state['code']}")
else:
# 第二次到达:工程师已决策
print(f"工程师决策: {decision}")
return state
关键设计:
human_decision 是否为 None 判断执行次数interrupt_before),展示代码update_state 设置决策from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
workflow = StateGraph(CodeState)
# 添加3个节点
workflow.add_node("generate", generate_code)
workflow.add_node("review", human_review)
workflow.add_node("commit", commit_code)
# 定义流程
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "review")
workflow.add_edge("review", "commit")
workflow.add_edge("commit", END)
# 编译:在review节点前暂停
checkpointer = MemorySaver()
app = workflow.compile(
interrupt_before=["review"],
checkpointer=checkpointer
)
流程:
START → [generate] → ⏸ 暂停点 → [review] → [commit] → END
这是人工介入最核心的概念:工作流需要调用两次 **invoke()**。
为什么需要两次调用
因为人工介入本质上是将一个完整的工作流切成两段:
┌─────────────────────────────────────────────────────────────┐
│ 完整工作流(无人工介入) │
│ START → generate → review → commit → END │
│ ▲ │
│ └─ 一次invoke()直接完成 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 人工介入工作流(切成两段) │
│ │
│ 【第一段】 【暂停】 【第二段】 │
│ START → generate → ⏸ 等待工程师 → review → commit → END │
│ ▲ │ ▲ │
│ │ │ │ │
│ 第一次invoke() 人工决策 第二次invoke() │
└─────────────────────────────────────────────────────────────┘
第一次调用:执行到暂停点
config = {"configurable": {"thread_id": "session_001"}}
# 第一次调用:从START开始,执行到review节点前
result = app.invoke(
{"requirement": "快速排序算法"}, # 传入初始输入
config=config
)
# 此时工作流状态:
# ✓ generate节点已执行(代码已生成)
# ⏸ review节点前暂停(因为interrupt_before)
# ✗ review和commit节点尚未执行
print(result["code"]) # 可以访问已生成的代码
print(result["human_decision"]) # None(尚未决策)
人工决策:更新状态
# 工程师查看代码后做出决策
app.update_state(config, {"human_decision": "approve"})
**update_state()** 的作用:
checkpointer 中的状态快照human_decision 字段,从 None 变为 approve第二次调用:从暂停点继续
# 第二次调用:从暂停点继续执行
result = app.invoke(
None, # 关键:传入None表示"从检查点恢复"
config # 必须使用相同的thread_id
)
# 此时工作流状态:
# ✓ 从review节点继续执行
# ✓ review节点读取到human_decision="approve"
# ✓ commit节点执行完成
# ✓ 到达END
print(result["final_code"]) # 最终提交的代码
关键点:
**None** 而不是初始输入 None 告诉LangGraph:"不要从头开始,从检查点恢复"**thread_id**为什么必须使用 checkpointer
在代码审核案例中,如果没有 checkpointer,工作流无法实现暂停/恢复:
# 错误:没有checkpointer
app = workflow.compile(interrupt_before=["review"])
# 报错:interrupt_before requires checkpointer
# ✓ 正确:提供checkpointer
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = workflow.compile(
interrupt_before=["review"],
checkpointer=checkpointer # 必需:保存状态快照
)
原因:暂停/恢复需要保存和恢复工作流状态,这正是 checkpointer 的职责。而CheckPointer有很多种类型。
内存存储 vs 持久化存储:
# MemorySaver:存储在内存中(进程结束即丢失)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver() # 适合演示和测试
# SqliteSaver:持久化到数据库(进程结束后仍保留)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
多个暂停点的工作流
在复杂业务场景中,可能需要多个人工审核点。LangGraph 支持在多个节点前设置暂停点。
from langgraph.checkpoint.memory import MemorySaver
# 编译时指定多个暂停点
checkpointer = MemorySaver()
app = workflow.compile(
interrupt_before=["review1", "review2"], # 列表形式,多个节点
checkpointer=checkpointer
)
流程:
START → [generate] → ⏸ 暂停点1 → [review1] → [test] → ⏸ 暂停点2 → [review2] → [deploy] → END
关键点:
invoke()update_state() 更新决策invoke(None, config) 继续执行thread_id| 中文 | English | 音标 | 说明 |
|---|---|---|---|
| 人工介入 | Human-in-the-Loop | /ˈhjuːmən ɪn ðə luːp/ | 工作流中设置暂停点等待人类审核和决策 |
| 检查点管理器 | Checkpointer | /ˈtʃekpɔɪntər/ | 保存工作流状态快照,支持暂停后恢复 |
| 中断 | Interrupt | /ˌɪntəˈrʌpt/ | 编译时设置 interrupt_before 在指定节点前暂停 |
| 恢复 | Resume | /rɪˈzuːm/ | 第二次 invoke(None) 从检查点继续执行 |
现在你已经掌握了 LangGraph 构建复杂工作流的能力。但你可能会发现一个新问题:每次让 AI 帮你写代码、审查代码、生成文档时,都要重复说明一遍规范和要求——"请用 Python 3.10+ 语法,遵循 PEP 8,所有函数要有类型注解。.."说了十遍、二十遍,是不是很烦?
接下来的 第10章 Skill,我们将学习如何把这些重复的指令和专业知识封装成可复用的模块,让 AI 自动遵循团队规范,就像给它配备了一套专业的"工作手册"。